62ddca638ae76918f3a0a5fac4ca8fbdb1c7fb9a,runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java,CreateStreamTest,testLateDataAccumulating,#,83
Before Change
Pipeline p = pipelineRule.createPipeline();
Instant instant = new Instant(0);
CreateStream<TimestampedValue<Integer>> source =
CreateStream.<TimestampedValue<Integer>>withBatchInterval(pipelineRule.batchDuration())
.nextBatch()
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
.nextBatch(
TimestampedValue.of(1, instant),
TimestampedValue.of(2, instant),
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
// These elements are late but within the allowed lateness
.nextBatch(
TimestampedValue.of(4, instant),
TimestampedValue.of(5, instant))
// These elements are droppably late
.advanceNextBatchWatermarkToInfinity()
.nextBatch(
TimestampedValue.of(-1, instant),
TimestampedValue.of(-2, instant),
TimestampedValue.of(-3, instant));
PCollection<Integer> windowed = p
.apply(source).setCoder(TimestampedValue.TimestampedValueCoder.of(VarIntCoder.of()))
.apply(ParDo.of(new OnlyValue<Integer>()))
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))
After Change
Pipeline p = pipelineRule.createPipeline();
Instant instant = new Instant(0);
CreateStream<Integer> source =
CreateStream.of(VarIntCoder.of(), pipelineRule.batchDuration())
.emptyBatch()
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(6)))
.nextBatch(
TimestampedValue.of(1, instant),
TimestampedValue.of(2, instant),
TimestampedValue.of(3, instant))
.advanceWatermarkForNextBatch(instant.plus(Duration.standardMinutes(20)))
// These elements are late but within the allowed lateness
.nextBatch(
TimestampedValue.of(4, instant),
TimestampedValue.of(5, instant))
// These elements are droppably late
.advanceNextBatchWatermarkToInfinity()
.nextBatch(
TimestampedValue.of(-1, instant),
TimestampedValue.of(-2, instant),
TimestampedValue.of(-3, instant));
PCollection<Integer> windowed = p
.apply(source)
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))